// Copyright 2015 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "content/child/shared_memory_data_consumer_handle.h"

#include <algorithm>
#include <deque>
#include <utility>

#include "base/bind.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop.h"
#include "base/single_thread_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_task_runner_handle.h"
#include "content/public/child/fixed_received_data.h"

namespace content {

namespace {

    class DelegateThreadSafeReceivedData final
        : public RequestPeer::ThreadSafeReceivedData {
    public:
        explicit DelegateThreadSafeReceivedData(
            std::unique_ptr<RequestPeer::ReceivedData> data)
            : data_(std::move(data))
            , task_runner_(base::ThreadTaskRunnerHandle::Get())
        {
        }
        ~DelegateThreadSafeReceivedData() override
        {
            if (!task_runner_->BelongsToCurrentThread()) {
                // Delete the data on the original thread.
                task_runner_->DeleteSoon(FROM_HERE, data_.release());
            }
        }

        const char* payload() const override { return data_->payload(); }
        int length() const override { return data_->length(); }

    private:
        std::unique_ptr<RequestPeer::ReceivedData> data_;
        scoped_refptr<base::SingleThreadTaskRunner> task_runner_;

        DISALLOW_COPY_AND_ASSIGN(DelegateThreadSafeReceivedData);
    };

} // namespace

using Result = blink::WebDataConsumerHandle::Result;

// All methods (except for ctor/dtor) must be called with |lock_| aquired
// unless otherwise stated.
class SharedMemoryDataConsumerHandle::Context final
    : public base::RefCountedThreadSafe<Context> {
public:
    explicit Context(const base::Closure& on_reader_detached)
        : result_(Ok)
        , first_offset_(0)
        , client_(nullptr)
        , writer_task_runner_(base::ThreadTaskRunnerHandle::Get())
        , on_reader_detached_(on_reader_detached)
        , is_on_reader_detached_valid_(!on_reader_detached_.is_null())
        , is_handle_active_(true)
        , is_two_phase_read_in_progress_(false)
    {
    }

    bool IsEmpty() const
    {
        lock_.AssertAcquired();
        return queue_.empty();
    }
    void ClearIfNecessary()
    {
        lock_.AssertAcquired();
        if (!is_handle_locked() && !is_handle_active()) {
            // No one is interested in the contents.
            if (is_on_reader_detached_valid_) {
                // We post a task even in the writer thread in order to avoid a
                // reentrance problem as calling |on_reader_detached_| may manipulate
                // the context synchronously.
                writer_task_runner_->PostTask(FROM_HERE, on_reader_detached_);
            }
            Clear();
        }
    }
    void ClearQueue()
    {
        lock_.AssertAcquired();
        queue_.clear();
        first_offset_ = 0;
    }
    RequestPeer::ThreadSafeReceivedData* Top()
    {
        lock_.AssertAcquired();
        return queue_.front().get();
    }
    void Push(std::unique_ptr<RequestPeer::ThreadSafeReceivedData> data)
    {
        lock_.AssertAcquired();
        queue_.push_back(std::move(data));
    }
    size_t first_offset() const
    {
        lock_.AssertAcquired();
        return first_offset_;
    }
    Result result() const
    {
        lock_.AssertAcquired();
        return result_;
    }
    void set_result(Result r)
    {
        lock_.AssertAcquired();
        result_ = r;
    }
    void AcquireReaderLock(Client* client)
    {
        lock_.AssertAcquired();
        DCHECK(!notification_task_runner_);
        DCHECK(!client_);
        notification_task_runner_ = base::ThreadTaskRunnerHandle::Get();
        client_ = client;
        if (client && !(IsEmpty() && result() == Ok)) {
            // We cannot notify synchronously because the user doesn't have the reader
            // yet.
            notification_task_runner_->PostTask(
                FROM_HERE, base::Bind(&Context::NotifyInternal, this, false));
        }
    }
    void ReleaseReaderLock()
    {
        lock_.AssertAcquired();
        DCHECK(notification_task_runner_);
        notification_task_runner_ = nullptr;
        client_ = nullptr;
    }
    void PostNotify()
    {
        lock_.AssertAcquired();
        auto runner = notification_task_runner_;
        if (!runner)
            return;
        // We don't re-post the task when the runner changes while waiting for
        // this task because in this case a new reader is obtained and
        // notification is already done at the reader creation time if necessary.
        runner->PostTask(FROM_HERE,
            base::Bind(&Context::NotifyInternal, this, false));
    }
    // Must be called with |lock_| not aquired.
    void Notify() { NotifyInternal(true); }
    // This function doesn't work in the destructor if |on_reader_detached_| is
    // not null.
    void ResetOnReaderDetached()
    {
        lock_.AssertAcquired();
        if (on_reader_detached_.is_null()) {
            DCHECK(!is_on_reader_detached_valid_);
            return;
        }
        is_on_reader_detached_valid_ = false;
        if (writer_task_runner_->BelongsToCurrentThread()) {
            // We can reset the closure immediately.
            on_reader_detached_.Reset();
        } else {
            // We need to reset |on_reader_detached_| on the right thread because it
            // might lead to the object destruction.
            writer_task_runner_->PostTask(
                FROM_HERE, base::Bind(&Context::ResetOnReaderDetachedWithLock, this));
        }
    }
    bool is_handle_locked() const
    {
        lock_.AssertAcquired();
        return static_cast<bool>(notification_task_runner_);
    }
    bool IsReaderBoundToCurrentThread() const
    {
        lock_.AssertAcquired();
        return notification_task_runner_ && notification_task_runner_->BelongsToCurrentThread();
    }
    bool is_handle_active() const
    {
        lock_.AssertAcquired();
        return is_handle_active_;
    }
    void set_is_handle_active(bool b)
    {
        lock_.AssertAcquired();
        is_handle_active_ = b;
    }
    void Consume(size_t s)
    {
        lock_.AssertAcquired();
        first_offset_ += s;
        auto* top = Top();
        if (static_cast<size_t>(top->length()) <= first_offset_) {
            queue_.pop_front();
            first_offset_ = 0;
        }
    }
    bool is_two_phase_read_in_progress() const
    {
        lock_.AssertAcquired();
        return is_two_phase_read_in_progress_;
    }
    void set_is_two_phase_read_in_progress(bool b)
    {
        lock_.AssertAcquired();
        is_two_phase_read_in_progress_ = b;
    }
    // Can be called with |lock_| not aquired.
    base::Lock& lock() { return lock_; }

private:
    // Must be called with |lock_| not aquired.
    void NotifyInternal(bool repost)
    {
        scoped_refptr<base::SingleThreadTaskRunner> runner;
        {
            base::AutoLock lock(lock_);
            runner = notification_task_runner_;
        }
        if (!runner)
            return;

        if (runner->BelongsToCurrentThread()) {
            // It is safe to access member variables without lock because |client_|
            // is bound to the current thread.
            if (client_)
                client_->didGetReadable();
            return;
        }
        if (repost) {
            // We don't re-post the task when the runner changes while waiting for
            // this task because in this case a new reader is obtained and
            // notification is already done at the reader creation time if necessary.
            runner->PostTask(FROM_HERE,
                base::Bind(&Context::NotifyInternal, this, false));
        }
    }
    void Clear()
    {
        lock_.AssertAcquired();
        ClearQueue();
        client_ = nullptr;
        ResetOnReaderDetached();
    }
    // Must be called with |lock_| not aquired.
    void ResetOnReaderDetachedWithLock()
    {
        base::AutoLock lock(lock_);
        ResetOnReaderDetached();
    }

    friend class base::RefCountedThreadSafe<Context>;
    ~Context() = default;

    base::Lock lock_;
    // |result_| stores the ultimate state of this handle if it has. Otherwise,
    // |Ok| is set.
    Result result_;
    std::deque<std::unique_ptr<RequestPeer::ThreadSafeReceivedData>> queue_;
    size_t first_offset_;
    Client* client_;
    scoped_refptr<base::SingleThreadTaskRunner> notification_task_runner_;
    scoped_refptr<base::SingleThreadTaskRunner> writer_task_runner_;
    base::Closure on_reader_detached_;
    // We need this boolean variable to remember if |on_reader_detached_| is
    // callable because we need to reset |on_reader_detached_| only on the writer
    // thread and hence |on_reader_detached_.is_null()| is untrustworthy on
    // other threads.
    bool is_on_reader_detached_valid_;
    bool is_handle_active_;
    bool is_two_phase_read_in_progress_;

    DISALLOW_COPY_AND_ASSIGN(Context);
};

SharedMemoryDataConsumerHandle::Writer::Writer(
    const scoped_refptr<Context>& context,
    BackpressureMode mode)
    : context_(context)
    , mode_(mode)
{
}

SharedMemoryDataConsumerHandle::Writer::~Writer()
{
    Close();
    base::AutoLock lock(context_->lock());
    context_->ResetOnReaderDetached();
}

void SharedMemoryDataConsumerHandle::Writer::AddData(
    std::unique_ptr<RequestPeer::ReceivedData> data)
{
    if (!data->length()) {
        // We omit empty data.
        return;
    }

    bool needs_notification = false;
    {
        base::AutoLock lock(context_->lock());
        if (!context_->is_handle_active() && !context_->is_handle_locked()) {
            // No one is interested in the data.
            return;
        }

        needs_notification = context_->IsEmpty();
        std::unique_ptr<RequestPeer::ThreadSafeReceivedData> data_to_pass;
        if (mode_ == kApplyBackpressure) {
            data_to_pass = base::MakeUnique<DelegateThreadSafeReceivedData>(std::move(data));
        } else {
            data_to_pass = base::MakeUnique<FixedReceivedData>(data.get());
        }
        context_->Push(std::move(data_to_pass));
    }

    if (needs_notification) {
        // We CAN issue the notification synchronously if the associated reader
        // lives in this thread, because this function cannot be called in the
        // client's callback.
        context_->Notify();
    }
}

void SharedMemoryDataConsumerHandle::Writer::Close()
{
    base::AutoLock lock(context_->lock());
    if (context_->result() == Ok) {
        context_->set_result(Done);
        context_->ResetOnReaderDetached();
        if (context_->IsEmpty()) {
            // We cannot issue the notification synchronously because this function
            // can be called in the client's callback.
            context_->PostNotify();
        }
    }
}

void SharedMemoryDataConsumerHandle::Writer::Fail()
{
    base::AutoLock lock(context_->lock());
    if (context_->result() == Ok) {
        // TODO(yhirano): Use an appropriate error code other than
        // UnexpectedError.
        context_->set_result(UnexpectedError);

        if (context_->is_two_phase_read_in_progress()) {
            // If we are in two-phase read session, we cannot discard the data. We
            // will clear the queue at the end of the session.
        } else {
            context_->ClearQueue();
        }

        context_->ResetOnReaderDetached();
        // We cannot issue the notification synchronously because this function can
        // be called in the client's callback.
        context_->PostNotify();
    }
}

SharedMemoryDataConsumerHandle::ReaderImpl::ReaderImpl(
    scoped_refptr<Context> context,
    Client* client)
    : context_(context)
{
    base::AutoLock lock(context_->lock());
    DCHECK(!context_->is_handle_locked());
    context_->AcquireReaderLock(client);
}

SharedMemoryDataConsumerHandle::ReaderImpl::~ReaderImpl()
{
    base::AutoLock lock(context_->lock());
    context_->ReleaseReaderLock();
    context_->ClearIfNecessary();
}

Result SharedMemoryDataConsumerHandle::ReaderImpl::read(
    void* data,
    size_t size,
    Flags flags,
    size_t* read_size_to_return)
{
    base::AutoLock lock(context_->lock());

    size_t total_read_size = 0;
    *read_size_to_return = 0;

    if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
        context_->set_result(UnexpectedError);

    if (context_->result() != Ok && context_->result() != Done)
        return context_->result();

    while (!context_->IsEmpty() && total_read_size < size) {
        auto* top = context_->Top();
        size_t readable = top->length() - context_->first_offset();
        size_t writable = size - total_read_size;
        size_t read_size = std::min(readable, writable);
        const char* begin = top->payload() + context_->first_offset();
        std::copy(begin, begin + read_size,
            static_cast<char*>(data) + total_read_size);
        total_read_size += read_size;
        context_->Consume(read_size);
    }
    *read_size_to_return = total_read_size;
    if (total_read_size || !context_->IsEmpty())
        return Ok;
    if (context_->result() == Done)
        return Done;
    return ShouldWait;
}

Result SharedMemoryDataConsumerHandle::ReaderImpl::beginRead(
    const void** buffer,
    Flags flags,
    size_t* available)
{
    *buffer = nullptr;
    *available = 0;

    base::AutoLock lock(context_->lock());

    if (context_->result() == Ok && context_->is_two_phase_read_in_progress())
        context_->set_result(UnexpectedError);

    if (context_->result() != Ok && context_->result() != Done)
        return context_->result();

    if (context_->IsEmpty())
        return context_->result() == Done ? Done : ShouldWait;

    context_->set_is_two_phase_read_in_progress(true);
    auto* top = context_->Top();
    *buffer = top->payload() + context_->first_offset();
    *available = top->length() - context_->first_offset();

    return Ok;
}

Result SharedMemoryDataConsumerHandle::ReaderImpl::endRead(size_t read_size)
{
    base::AutoLock lock(context_->lock());

    if (!context_->is_two_phase_read_in_progress())
        return UnexpectedError;

    context_->set_is_two_phase_read_in_progress(false);
    if (context_->result() != Ok && context_->result() != Done) {
        // We have an error, so we can discard the stored data.
        context_->ClearQueue();
    } else {
        context_->Consume(read_size);
    }

    return Ok;
}

SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
    BackpressureMode mode,
    std::unique_ptr<Writer>* writer)
    : SharedMemoryDataConsumerHandle(mode, base::Closure(), writer)
{
}

SharedMemoryDataConsumerHandle::SharedMemoryDataConsumerHandle(
    BackpressureMode mode,
    const base::Closure& on_reader_detached,
    std::unique_ptr<Writer>* writer)
    : context_(new Context(on_reader_detached))
{
    writer->reset(new Writer(context_, mode));
}

SharedMemoryDataConsumerHandle::~SharedMemoryDataConsumerHandle()
{
    base::AutoLock lock(context_->lock());
    context_->set_is_handle_active(false);
    context_->ClearIfNecessary();
}

std::unique_ptr<blink::WebDataConsumerHandle::Reader>
SharedMemoryDataConsumerHandle::obtainReader(Client* client)
{
    return base::WrapUnique(new ReaderImpl(context_, client));
}

const char* SharedMemoryDataConsumerHandle::debugName() const
{
    return "SharedMemoryDataConsumerHandle";
}

} // namespace content
